《 Kubebuilder v2 使用指南 》-P3-认识Kubebuilder

认识Kubebuilder

前言

没有人会喜欢黑盒,在使用一个工具之前,我习惯于尽可能多地去了解它,不然用起来会觉得不踏实。Controller的工作流程已经很熟悉了,理解kubebuilder的源码应该也比较容易。因此,大概阅读了一下了kubebuilder的源码,本篇圈出其中几个重点,用以帮助理解和认识kubebuilder。

概念名词

下面几个概念名词非常的重要,文中会多次提及。

Own Resource

CRD一般设计用作管理k8s内置的各类资源组合,用以实现自定义的部署和运行逻辑,是一种上层封装,因此被封装的下层build in Resouce实例,就称之为 Own Resource.

Owner Resource

与上面的 Own Resource对应的,CRD资源实例作为上层管理单位,称之为 Owner Resource.

Kind & Resource

两者在绝大部分情况下说是类型一一对应的关系,例如所有的Pod resources所属的Kind都是Pod Kind,可以简单理解为Kind是resource的种类标识。

GVK & GVR

GVK = Group + Version + Kind组合而来的,资源种类描述术语,例如 deployment kind的GVK是 extensions/v1beata1/deployments

GVR = Group + Version + Resource组合而来的,资源实例描述术语,例如某个deployment的name是deploy-sample,那么它的GVR则是extensions/v1beata1/deploy-sample

Scheme

每种资源的都需要有对应的Scheme,Scheme结构体包含gvkToType和typeToGVK的字段映射关系,APIServer 根据Scheme来进行资源的序列化和反序列化。

Scheme struct如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Scheme struct {
// versionMap allows one to figure out the go type of an object with
// the given version and name.
gvkToType map[unversioned.GroupVersionKind]reflect.Type

// typeToGroupVersion allows one to find metadata for a given go object.
// The reflect.Type we index by should *not* be a pointer.
typeToGVK map[reflect.Type][]unversioned.GroupVersionKind

// unversionedTypes are transformed without conversion in ConvertToVersion.
unversionedTypes map[reflect.Type]unversioned.GroupVersionKind

// unversionedKinds are the names of kinds that can be created in the context of any group
// or version
// TODO: resolve the status of unversioned types.
unversionedKinds map[string]reflect.Type

// Map from version and resource to the corresponding func to convert
// resource field labels in that version to internal version.
fieldLabelConversionFuncs map[string]map[string]FieldLabelConversionFunc

// defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
// the provided object must be a pointer.
defaulterFuncs map[reflect.Type]func(interface{})

// converter stores all registered conversion functions. It also has
// default coverting behavior.
converter *conversion.Converter

// cloner stores all registered copy functions. It also has default
// deep copy behavior.
cloner *conversion.Cloner
}

组件

controller使用client-go包里的informer模式工作,向APIServer watch GVK下对应的GVR,并充分利用cache、index、queue,可参考这张图片再回顾一下这个工作流程:

与此对应的,kubebuilder大致有下面几种主要组件:

Manager

Kubebuilder 的最外层管理组件,负责初始化controller、cache、client。

Cache

Kubebuilder 的内部组件,负责生成SharedInformer,watch关注的GVK下的GVR的变化(增删改),以触发 Controller 的 Reconcile 逻辑。

Clients

controller工作中需要对对资源进行CURD,CURD操作封装到Client中来进行,其中的写操作(增删改)直接访问 APIServer,其中的读操作(查)对接的是本地的 Cache。

初始化

首先使用kubebuilder初始化一个CRD项目,以便展开进入kubebuilder的内部。

在初始化之前,首先想好CRD资源的名称,名称不要与现有的资源名称冲突,api groupVersion,建议使用自定义的api groupVersion与内置的区别开。

查看现有的所有resource

1
kubectl api-resources -o wide

查看现有的api groupVersion

1
kubectl api-versions

我的CRD 名称定为: Unit, api groupVersion定为 custom/v1

init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 自定义
export CRD=Unit
export group=custom
export version=v1

mkdir -p CRD/${CRD} && cd CRD/${CRD}

export GO111MODULE=on

# 如果路径位于GOPATH/src下,go mod这一步可省略
go mod init ${CRD}

# domian可自定义
kubebuilder init --domain my.crd.com

# 为CRD生成API groupVersion
# kubebuilder create api --group custom --version v1 --kind Unit
kubebuilder create api --group ${group} --version ${version} --kind ${CRD}

需要交互两次:

目录创建完毕,使用IDE打开看看:

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
mbp-16in:Unit ywq$ tree
.
├── Dockerfile # 制作crd-controller镜像的ockerfile
├── Makefile # make编译文件
├── PROJECT # 项目元数据
├── api
│   └── v1
│   ├── groupversion_info.go # GVK信息、scheme生成的方法都在这里
│   ├── unit_types.go # 自定义CRD结构
│   └── zz_generated.deepcopy.go # 资源对象的操作一开始都是建立在deepcopy出来的复制对象身上的
├── bin
│   └── manager # go打包的二进制文件
├── config # 所有最终生成的需要kubectl apply的的资源,按照功能进行分片成不同的目录,这里有些地方可以做些自定义的配置
│   ├── certmanager
│   │   ├── certificate.yaml
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── crd # crd的配置
│   │   ├── kustomization.yaml
│   │   ├── kustomizeconfig.yaml
│   │   └── patches
│   │   ├── cainjection_in_units.yaml
│   │   └── webhook_in_units.yaml
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_webhook_patch.yaml
│   │   └── webhookcainjection_patch.yaml
│   ├── manager # manager的deployment在这里
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus # metric暴露
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac # rbac授权
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role_binding.yaml
│   │   ├── unit_editor_role.yaml
│   │   └── unit_viewer_role.yaml
│   ├── samples # Unit resource sample
│   │   └── custom_v1_unit.yaml
│   └── webhook # Unit webhook Service,用来接收APIServer转发而来的webhook请求
│   ├── kustomization.yaml
│   ├── kustomizeconfig.yaml
│   └── service.yaml
├── controllers
│   ├── suite_test.go
│   └── unit_controller.go # CRD controller的核心逻辑在这里
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go # Entrypoint

源码分析

从main.go入手:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func main() {
...
// new manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
LeaderElection: enableLeaderElection,
LeaderElectionID: "68e16627.my.crd.com",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

// register reconciler
if err = (&controllers.UnitReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Unit"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Unit")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
// start manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}

main方法里有3个步骤:

  • new manager
  • register reconciler
  • start manager

这些内部的方法,都是分布在依赖的包里面的,不再是在Unit目录下。进去分别来分析一下

New manager

main.go:57

1
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{...}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/alias.go:101`

1
NewManager = manager.New

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:229`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
...

// 创建Cache对象,用做client的读请求,以及生成informer
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}

// 创建读请求的client,即apiReader,读请求走的Cache
apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}

// 创建写请求的client,写请求直连APIServer
writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
// recorderProvider,记录event事件用的,kubectl describe可用到
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
if err != nil {
return nil, err
}

// controller多副本leader选举使用的
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
LeaderElection: options.LeaderElection,
LeaderElectionID: options.LeaderElectionID,
LeaderElectionNamespace: options.LeaderElectionNamespace,
})
if err != nil {
return nil, err
}

// 暴露/metrics给prometheus使用
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
if err != nil {
return nil, err
}

...

return &controllerManager{
config: config,
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
// writeObj赋值给client字段
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
}, nil
}

可以看到,这些步骤里面,值得继续深入的是NewCacheNewClient

NewCache

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:246`

1
2
3
4
5
6
7
8
9
// options在这里,展开来去看看NewCache方法
options = setOptionsDefaults(options)
...

// 使用options.NewCache方法来生成cache对象
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:351`

1
2
3
4
5
6
7
8
9
func setOptionsDefaults(options Options) Options {
...

// Allow newCache to be mocked
if options.NewCache == nil {
options.NewCache = cache.New
}
...
}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/cache/cache.go:110`

1
2
3
4
5
6
7
8
9
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}

可以发现,这里就是根据配置,来生成所需要监测的每种GVK对应的Informer。至于需要监测的这些GVK在哪配置,后面的篇章中会提及。

NewClient

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:256`

1
2
3
4
5
6
7
8
// options在这里,展开来去看看NewClient方法
options = setOptionsDefaults(options)

// 使用options.NewClient方法来生成client对象
writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:351`

1
2
3
4
5
6
7
8
9
10
11
// setOptionsDefaults set default values for Options fields
func setOptionsDefaults(options Options) Options {
...

// Allow newClient to be mocked
if options.NewClient == nil {
options.NewClient = defaultNewClient
}

...
}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:320`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// 写操作的直连APIServer的client
c, err := client.New(config, options)
if err != nil {
return nil, err
}

return &client.DelegatingClient{
Reader: &client.DelegatingReader{
// 读操作走manager里面的cache
CacheReader: cache,
// 写操作的直连client
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/client/client.go:54`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 利用scheme来取得给定的资源type所属的GVK。
// 简而言之,这里的client是通过scheme与APIServer直接进行序列化和反序列化交互的
func New(config *rest.Config, options Options) (Client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}

// Init a scheme if none provided
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}

// Init a Mapper if none provided
if options.Mapper == nil {
var err error
options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
if err != nil {
return nil, err
}
}

dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}

c := &client{
typedClient: typedClient{
cache: clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[reflect.Type]*resourceMeta),
},
paramCodec: runtime.NewParameterCodec(options.Scheme),
},
unstructuredClient: unstructuredClient{
client: dynamicClient,
restMapper: options.Mapper,
},
}

return c, nil
}

register reconciler

reconciler即controller,命名为reconciler,意为协调器更贴切,控制器的核心逻辑在这里面。

main.go:69

1
2
3
4
5
6
7
8
if err = (&controllers.UnitReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Unit"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Unit")
os.Exit(1)
}

==> controllers/unit_controller.go:49

1
2
3
4
5
func (r *UnitReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&customv1.Unit{}).
Complete(r)
}

Complete方法是用作生成Builder.

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:128`

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:134`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Build builds the Application ControllerManagedBy and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
...

// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}

// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}

return blder.ctrl, nil
}

Builder最主要的是doController()doWatch()方法,分别来看下。

doController

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:145`

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:213`

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:35`

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/controller/controller.go:63`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// New returns a new Controller registered with the Manager.  The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}

if len(name) == 0 {
return nil, fmt.Errorf("must specify Name for Controller")
}

if options.MaxConcurrentReconciles <= 0 {
options.MaxConcurrentReconciles = 1
}

// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}

// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(name),
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
}

// Add the controller as a Manager components
return c, mgr.Add(c)
}

可以看出,doController方法是生成Controller,并将其注册进Manager的外层主体进行托管。

其中,Controller结构体实例内包含的字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
c := &controller.Controller{
// Reconciler只有一个接口方法Reconcile(),这个方法是CRD控制的核心逻辑,kubebuilder已经自动生成,但里面的逻辑需要填充,见下面
Do: options.Reconciler,
// Cache用来对接informer检测GVR状态并保存在缓存中,提供用作读
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
// 用作资源实例Type的正反序列化
Scheme: mgr.GetScheme(),
// Client用作写请求,直连APIServer
Client: mgr.GetClient(),
// 记录event
Recorder: mgr.GetEventRecorderFor(name),
// workqueue
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
},
// 协调器的并发数限制
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
}

==> controllers/unit_controller.go:40

1
2
3
4
5
6
7
8
func (r *UnitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("unit", req.NamespacedName)

// your logic here

return ctrl.Result{}, nil
}

Reconcile()方法在这里,逻辑需要自己实现。后面的篇章中会描述我的需求、设计和代码实例。

doWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.apiType}
// register handler
hdler := &handler.EnqueueRequestForObject{}

// Watch CRD资源的变更请求
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
if err != nil {
return err
}

// Watch 被CRD管理的own resource的变更请求
for _, obj := range blder.managedObjects {
src := &source.Kind{Type: obj}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.apiType,
IsController: true,
}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
return err
}
}


for _, w := range blder.watchRequest {
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
return err
}

}
return nil
}

doWatch()主要干两件事:watch CRD 资源的变更,以及watch CRD 资源的own resouces的变更.watch到变更之后下一步做什么呢?当然是交给handler来处理,来看一下这里第二行代码中生成的handler是做什么的。

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/handler/enqueue.go:34`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type EnqueueRequestForObject struct{}

// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}

// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}

if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}

// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}

不出意外,handler会增删查的写请求的对象的NamespacedName,压入workqueue里面,与此同时,在另一头检测workqueue的协调器Reconciler默默地开始运转。

需要准备的注册工作都做完了,下面就要回到main.go中,启动外层托管组件Manager了。

启动Manager

启动Manager分为两步,第一步准备好Cache组件,第二步启动controller组件

启动Cache

main.go:80

1
2
3
4
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}

上面ctrl.NewManager()方法最终返回的是controllerManager{}对象的指针,来controllerManager里面找一下Start()方法.

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:403`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cm *controllerManager) Start(stop <-chan struct{}) error {
...

go cm.startNonLeaderElectionRunnables()

// 多controller实例时要进行leader选举,获取leader lock的实例才开始工作。startNonLeaderElectionRunnables和startLeaderElectionRunnables内在的工作方式本质无区别。
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.startLeaderElectionRunnables()
}

...
}

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:465`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (cm *controllerManager) startLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

// 重点关注这里的waitForCache
cm.waitForCache()

// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
if err := ctrl.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
// we use %T here because we don't have a good stand-in for "name",
// and the full runnable might not serialize (mutexes, etc)
log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
}()
}

cm.startedLeader = true
}

==> `/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:489`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

func (cm *controllerManager) waitForCache() {
if cm.started {
return
}

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)
cm.started = true
}

waitForCache的主要作用是启动Cache和等待Cache的首次同步完成。

启动cache的步骤则包括:创建FIFO queue、初始化informer、reflector、LocalStorage cache、index索引等。

启动controller

controller启动之后的工作模式分析之前的controller系列文章已经讲过很多次了,这里再快速回顾一遍。

`/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/internal/controller/controller.go:146`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (c *Controller) Start(stop <-chan struct{}) error {
...

// Launch workers to process resources
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// 多个workder,间隔一定时间(1s)工作一次
go wait.Until(c.worker, c.JitterPeriod, stop)
}

...
}

func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}

defer c.Queue.Done(obj)
return c.reconcileHandler(obj)
}

func (c *Controller) reconcileHandler(obj interface{}) bool {
...

// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(req); err != nil {
...
}

...
}

worker内部的最终逻辑回到了Reconcile方法,也即是需要在controllers/unit_controller.go:40中的Reconcile()自定义逻辑的方法,按照自定义的逻辑运行。

总结

如果之前看过内置资源的Controller的源码,对Controller工作方式有了解,那么理解Kubebuilder起来也是轻车熟路。

归纳下来,kubebuilder创建的controller做的事情也是跟文章最上面的流程图一样,只是kubebuilder经过了高度的封装后,便利程度到了仅需要实现Reconcile方法内部的逻辑即可,中间所有的流程都按照标准controller的运行方式替你包揽实现了。

本篇对kubebuilder的核心重点介绍到此结束,下一篇将开始正式介绍Unit CRD的设计思路,例如Unit会管理它的own resource,以及kubebuilder如何来帮助 授权管理own resource、同步Unit与own resource。

赏一瓶快乐回宅水吧~
-------------本文结束感谢您的阅读-------------